package com.atbeijing.D03;

import com.atbeijing.D02.SensorReading;
import com.atbeijing.D02.SensorSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * 所有数据分到一个流再开窗
 * 和windowAll差不多
 */
public class Example5 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .addSource(new SensorSource())
                .keyBy(new KeySelector<SensorReading, Boolean>() {
                    @Override
                    public Boolean getKey(SensorReading value) throws Exception {
                        return true;
                    }
                })
                //机器时间的滚动窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<SensorReading>() {
                    @Override
                    public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
                        return new SensorReading("sensor",value1.temperature + value2.temperature,0L);
                    }
                })
                .print();

        env.execute();
    }
}
